SpringBoot整合任务调度框架Quartz的基础搭建 您所在的位置:网站首页 springboot 任务调度 SpringBoot整合任务调度框架Quartz的基础搭建

SpringBoot整合任务调度框架Quartz的基础搭建

2024-01-25 05:46| 来源: 网络整理| 查看: 265

Quartz的整体概括 什么是quartz

何为quartz,请看官网的说法:

Quartz is a richly featured, open source job scheduling library that can be integrated within virtually any Java application - from the smallest stand-alone application to the largest e-commerce system. Quartz can be used to create simple or complex schedules for executing tens, hundreds, or even tens-of-thousands of jobs; jobs whose tasks are defined as standard Java components that may execute virtually anything you may program them to do. The Quartz Scheduler includes many enterprise-class features, such as support for JTA transactions and clustering.

简单来说,quartz是一个开源任务调度库,可以用来创建简单或复杂的调度,低至十个多至数百万个。它是一个标准的java组件,支持JTA,集群等多种企业级功能。

市面上有很多定时任务框架在quartz的基础上做了二次开发,xxl-job(基于quartz),elastic-job(基于quartz和zk),所以quartz到底是怎么玩的,它有哪些特性,下面来聊一聊。

quartz的基本概念 任务(Job):实际要触发的事件 触发器(Trigger):用于设定时间规则 调度器(Scheduler):组合任务与触发器

quartz就这三样东西,我们新建作业,通过trigger设置规则触发,由scheduler进行整合,非常简单。

Springboot整合quartz的基础搭建

一般企业级项目开发都用的Springboot,下面就来讲一讲quartz整合Springboot的一些要点。

依赖

quartz版本2.3.0,springboot版本1.5.18.RELEASE

1.8 1.1.5 2.3.0 1.2.40 1.3.0 1.2.16 1.7.7 1.7.7 org.springframework.boot spring-boot-starter-jdbc org.springframework.boot spring-boot-starter-web mysql mysql-connector-java runtime com.alibaba druid-spring-boot-starter ${druid.version} org.quartz-scheduler quartz ${quartz.version} org.quartz-scheduler quartz-jobs ${quartz.version} org.springframework spring-context-support org.projectlombok lombok com.alibaba fastjson ${fastjson.version} org.mybatis.spring.boot mybatis-spring-boot-starter ${mybatis.version} org.apache.commons commons-lang3 3.3.2 org.slf4j slf4j-api ${slf4j-api.version} org.slf4j slf4j-log4j12 ${slf4j-log4j12.version} log4j log4j ${log4j.version} org.springframework.boot spring-boot-starter-aop org.springframework.boot spring-boot-starter-test test Configuration

通过AutowireCapableBeanFactory,使用spring注入的方式实现在job里注入spring的bean。

/** * 继承org.springframework.scheduling.quartz.SpringBeanJobFactory * 实现任务实例化方式 */ public static class AutowiringSpringBeanJobFactory extends SpringBeanJobFactory implements ApplicationContextAware { private transient AutowireCapableBeanFactory beanFactory; @Override public void setApplicationContext(final ApplicationContext context) { beanFactory = context.getAutowireCapableBeanFactory(); } /** * 将job实例交给spring ioc托管 * 我们在job实例实现类内可以直接使用spring注入的调用被spring ioc管理的实例 * * @param bundle * @return * @throws Exception */ @Override protected Object createJobInstance(final TriggerFiredBundle bundle) throws Exception { final Object job = super.createJobInstance(bundle); /** * 将job实例交付给spring ioc */ beanFactory.autowireBean(job); return job; } } /** * 配置任务工厂实例 * * @param applicationContext spring上下文实例 * @return */ @Bean public JobFactory jobFactory(ApplicationContext applicationContext) { /** * 采用自定义任务工厂 整合spring实例来完成构建任务 * see {@link AutowiringSpringBeanJobFactory} */ AutowiringSpringBeanJobFactory jobFactory = new AutowiringSpringBeanJobFactory(); jobFactory.setApplicationContext(applicationContext); return jobFactory; } /** * 配置任务调度器 * 使用项目数据源作为quartz数据源 * * @param jobFactory 自定义配置任务工厂 * @param dataSource 数据源实例 * @return * @throws Exception */ @Bean(destroyMethod = "destroy", autowire = Autowire.NO) public SchedulerFactoryBean schedulerFactoryBean(JobFactory jobFactory, DataSource dataSource) throws Exception { SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean(); //将spring管理job自定义工厂交由调度器维护 schedulerFactoryBean.setJobFactory(jobFactory); //设置覆盖已存在的任务 schedulerFactoryBean.setOverwriteExistingJobs(true); //项目启动完成后,等待2秒后开始执行调度器初始化 schedulerFactoryBean.setStartupDelay(2); //设置调度器自动运行 schedulerFactoryBean.setAutoStartup(true); //设置数据源,使用与项目统一数据源 schedulerFactoryBean.setDataSource(dataSource); //设置上下文spring bean name schedulerFactoryBean.setApplicationContextSchedulerContextKey("applicationContext"); //设置配置文件位置 schedulerFactoryBean.setConfigLocation(new ClassPathResource("/quartz.properties")); return schedulerFactoryBean; }

这里需要提到一点,由于job的初始化时是通过new出来的,不受spring的管理,无法接受业务相关的bean,故这里使用AutowireCapableBeanFactory实现了new出来的对象通过注解可注入受spring管理的bean了。

AbstractAutowireCapableBeanFactory#autowireBean

@Override public void autowireBean(Object existingBean) { // Use non-singleton bean definition, to avoid registering bean as dependent bean. RootBeanDefinition bd = new RootBeanDefinition(ClassUtils.getUserClass(existingBean)); bd.setScope(BeanDefinition.SCOPE_PROTOTYPE); bd.allowCaching = ClassUtils.isCacheSafe(bd.getBeanClass(), getBeanClassLoader()); BeanWrapper bw = new BeanWrapperImpl(existingBean); initBeanWrapper(bw); populateBean(bd.getBeanClass().getName(), bd, bw); }

由源码可知,此类调用了populateBean的方法用来装配bean。具体spring的bean的加载注册过程可参考spring.io。

通过schedulerFactoryBean的ConfigLocation来读取quartz的基本配置信息,注意quartz.properties配置文件一定要放在classpath下。

#调度器实例名称 org.quartz.scheduler.instanceName = quartzScheduler #调度器实例编号自动生成 org.quartz.scheduler.instanceId = AUTO #持久化方式配置 org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX #持久化方式配置数据驱动,MySQL数据库 org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate #quartz相关数据表前缀名 org.quartz.jobStore.tablePrefix = QRTZ_ #开启分布式部署 org.quartz.jobStore.isClustered = true #配置是否使用 org.quartz.jobStore.useProperties = false #分布式节点有效性检查时间间隔,单位:毫秒 org.quartz.jobStore.clusterCheckinInterval = 10000 #线程池实现类 org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool #执行最大并发线程数量 org.quartz.threadPool.threadCount = 10 #线程优先级 org.quartz.threadPool.threadPriority = 5 #配置为守护线程,设置后任务将不会执行 #org.quartz.threadPool.makeThreadsDaemons=true #配置是否启动自动加载数据库内的定时任务,默认true org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true

我们看到org.quartz.jobStore.class进行持久化配置设置成了JobStoreTX属性,需要建立数据库表进行任务信息的持久化。其实官方还有一种RAMJobStore用于存储内存中的调度信息,当进程终止时,所有调度信息都将丢失。本文使用JobStoreTX(需要建立quartz的大概10张表,建表语句传在了github)。

Job&JobDetail

JobDetail作为Job的实例,一般由静态方法JobBuilder创建,通过fluent风格链式构建了Job的各项属性,

其中newJob需要一个泛型上限为Job的入参。

// 构建job信息 JobDetail job = JobBuilder.newJob(DynamicQuartzJob.class) .withIdentity(jobKey) //jobName+jobGroup .withDescription(quartzJobDetails.getDescription()) .usingJobData("jobData", quartzJobDetails.getJobData()) .build();

而Job接口只有一个简单的方法:

public interface Job { void execute(JobExecutionContext context) throws JobExecutionException; }

当定时任务跑起来的时候,execute里的代码将会被执行。

比如我们创建一个简单的定时任务:

public class QuartzTest extends QuartzJobBean static Logger logger = LoggerFactory.getLogger(QuartzTest.class); { @Override protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException { logger.info("我是测试任务,我跑起来了,时间:{}",new Date()); }

注:QuartzJobBean是Job接口的实现类。

Trigger

JobDetail由JobBuilder类的静态方法构建,同样,Trigger触发器由TriggerBuilder的静态方法构建。

// 构建job的触发规则 cronExpression Trigger trigger = TriggerBuilder.newTrigger() .withIdentity(triggerKey) .startNow() .withSchedule(CronScheduleBuilder .cronSchedule(quartzJobDetails.getCronExpression())) .build();

Trigger触发器用于触发任务作业,当trigger触发器触发执行时,scheduler调度程序中的其中一个线程将调用execute()的一个线程。quartz最常用的触发器分为SimpleTrigger和CronTrigger触发器两种。 SimpleTrigger用于在给定时间执行一次作业,或给定时间每隔一段时间执行一次作业。这个功能Springboot的@scheduled注解也能实现。 如果是希望以日历时间表触发,则CronTrigger就比较合适例如每周六下午3点执行,我们完全可以用cron表达式实现日历触发的时间规则,cron表达式可由quartzJobDetails对象的CronExpression属性传入。

最后,别忘了用scheduler将job和trigger整合起来,因为他们是统一协作的:

// 注册job和trigger信息 scheduler.scheduleJob(job, trigger); JobDataMap

一般业务方法会要求动态传参处理,这时候就需要jobDataMap来进行参数传递了。我们在构建JobDetail的时候,通过

usingJobData("jobData", quartzJobDetails.getJobData())

动态传入调度任务所需的参数,以达到业务需求。

JobListener&TriggerListener

用于在任务调度期间,各阶段的状态解读。这里我就以JobListener为例,TriggerListener也是相似的。

首先,构建jobListener

jobContext.getScheduler().getListenerManager().addJobListener(new MyJobListener(), KeyMatcher.keyEquals(jobKey));

这里我是在executeInternal方法里面构建的,因为listner不会持久化,服务重启将会丢失监听。当然在构建job的时候也可以注册listener,如果没持久化监听的需求的话。

看一下MyJobListener:

public class MyJobListener implements JobListener { public static final String LISTENER_NAME = "MyJobListener"; // @Autowired // private JobScheduleLogMapper logMapper; @Override public String getName() { return LISTENER_NAME; //must return a name } //任务被调度前 @Override public void jobToBeExecuted(JobExecutionContext context) { String jobName = context.getJobDetail().getKey().toString(); // System.out.println("jobToBeExecuted"); System.out.println("Job调度前 : " + jobName + " is going to start..."); } //任务调度被拒了 @Override public void jobExecutionVetoed(JobExecutionContext context) { System.out.println("Job调度被拒:jobExecutionVetoed"); //todo:原因捕获 } //任务被调度后 @Override public void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) { // System.out.println("Job调度器:jobWasExecuted"); String jobName = context.getJobDetail().getKey().toString(); System.out.println("Job调度后 : " + jobName + " is finished..."); if (jobException!=null&&!jobException.getMessage().equals("")) { System.out.println("Exception thrown by: " + jobName + " Exception: " + jobException.getMessage()); } JobScheduleLog log = new JobScheduleLog(); log.setJobRuntime(String.valueOf(context.getJobRunTime())); log.setId(Optional.ofNullable(context.get("id")).map(p->Integer.parseInt(String.valueOf(context.get("id")))).orElse(null)); JobScheduleLogMapper logMapper = SpringContextHolder.getBean(JobScheduleLogMapper.class); logMapper.updateByPrimaryKeySelective(log); } }

任务调度前,调度后已经任务被拒,我们都可以使用钩子。

动态构建任务调度

下一个问题,我们知道新建一个调度job只要继承QuartzJobBean类并实现executeInternal就行,那么如果我有成百上千个任务,难道我要新建几千个类么?如果我想把已有的方法加入定时任务调度,难道我还要去改造原有的方法么? 必然不是的,这时候我们可以新建一个动态类继承QuartzJobBean,并新建自己的业务表(例如建一个jobCaller表),传入项目方法的全类路径,这样我们就可以executeInternal方法里通过读表拉取需要调度的任务方法,通过jobDataMap拿到参数,通过反射直接invoke目标方法了,这样就省去了大量的构建调度任务的工作了,并且可以在不动原有业务代码的基础上,定向指定任何一个方法加入任务调度了。 ok,talk is cheap, show me the code:

public class DynamicQuartzJob extends QuartzJobBean { @Autowired private JobScheduleLogManager jobManager; @Override protected void executeInternal(JobExecutionContext jobContext) { try { int i = jobManager.trans2JobLogBefore(jobContext); if (i


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有